package cn.rdtimes.wolfdsp.core.job;

import cn.rdtimes.wolfdsp.core.data.JobGraphInstance;
import cn.rdtimes.wolfdsp.core.data.JobGraphState;
import cn.rdtimes.wolfdsp.core.job.params.DefaultParameters;
import cn.rdtimes.wolfdsp.core.job.params.Parameters;
import cn.rdtimes.wolfdsp.core.util.StringUtil;

import java.util.Arrays;
import java.util.Map;
import java.util.Set;

/**
 * 调度执行中的流程实例
 *
 * @author BZ
 */
public class PerformJobGraphInstance {
    private final JobGraphInstance jobGraphInstance;
    private final Parameters<String, Object> inputParams = new DefaultParameters(Parameters.ParameterKind.JOB_GRAPH);

    // 应该在流程运行开始前完成构建
    // 直接任务(孩子)的下标数组
    private int[] childTaskIndexes = new int[0];
    // 所有任务实例的数组,如果某个实例完成对应的索引位置为空值
    private PerformTaskInstance[] allTasks = new PerformTaskInstance[0];

    // 记录任务实例完成的数量,当完成的数量等于allTasks大小时,说明job整体运行完成
    private volatile int completedTaskCount;

    PerformJobGraphInstance(JobGraphInstance instance) {
        this.jobGraphInstance = instance;
        String json = instance.getParamJson();
        if (!StringUtil.isEmpty(json)) {
            inputParams.parseJson(json);
        }
    }

    final String getJobId() {
        return jobGraphInstance.getJobId();
    }

    final String getInstanceId() {
        return jobGraphInstance.getInstanceId();
    }

    final synchronized void addChildTask(PerformTaskInstance instance) {
        childTaskIndexes = Arrays.copyOf(childTaskIndexes, childTaskIndexes.length + 1);
        childTaskIndexes[childTaskIndexes.length - 1] = instance.getTaskIndex();
    }

    final synchronized void addTask(PerformTaskInstance instance) {
        allTasks = Arrays.copyOf(allTasks, allTasks.length + 1);
        int index = allTasks.length - 1;
        allTasks[index] = instance;
        instance.setTaskIndex(index);
    }

    final int getChildTaskCount() {
        return childTaskIndexes.length;
    }

    final int getTaskCount() {
        return allTasks.length;
    }

    final synchronized PerformTaskInstance[] getChildTask() {
        PerformTaskInstance[] ret = new PerformTaskInstance[childTaskIndexes.length];
        for (int i = 0; i < childTaskIndexes.length; i++) {
            ret[i] = allTasks[childTaskIndexes[i]];
        }
        return ret;
    }

    /**
     * 添加正常完成的任务实例,将对应的数组下标为空
     * 查看当前的流程状态是否为运行,不是将返回false
     * 比较是否所有的任务都完成了,如果完成了就更改流程的状态,返回true,将结束流程运行
     * <p>
     * 异常或失败的不在这里处理,在外部处理
     *
     * @param taskInstance 任务实例
     * @param isSkip       true-表示任务实例状态是跳过
     * @return true-表示任务都完成了,可能需要结束流程
     */
    final synchronized boolean addCompletedTask(PerformTaskInstance taskInstance, boolean isSkip) {
        if (completedTaskCount >= allTasks.length) return true;

        if (!isSkip)
            jobGraphInstance.incCompletedCount();
        else
            jobGraphInstance.incSkipCount();
        if (++completedTaskCount >= allTasks.length) {
            jobGraphInstance.setEndTime(System.currentTimeMillis());
            jobGraphInstance.setJobGraphState(JobGraphState.FINISHED);
            return true;
        }
        return false;
    }

    final PerformTaskInstance findPerformTaskInstance(String taskId) {
        PerformTaskInstance[] pti = allTasks;
        int size = pti.length;
        if (size > 16) {
            size >>= 1;
            for (int i = 0; i < size; i++) {
                if (pti[i].getTaskId().equals(taskId)) {
                    return pti[i];
                }
            }
            for (int i = pti.length - 1; i >= size; i--) {
                if (pti[i].getTaskId().equals(taskId)) {
                    return pti[i];
                }
            }
        } else {
            for (int i = 0; i < size; i++) {
                if (pti[i].getTaskId().equals(taskId)) {
                    return pti[i];
                }
            }
        }
        return null;
    }

    final PerformTaskInstance[] getAllTasks() {
        return allTasks;
    }

    final int getProgress() {
        int c = completedTaskCount;
        int len = allTasks.length;
        return (len == 0) ? 0 : (c / len) * 100;
    }

    final JobGraphInstance getJobGraphInstance() {
        return jobGraphInstance;
    }

    final Parameters<String, Object> getInputParams() {
        return inputParams;
    }

    final synchronized boolean mergeJobGraphParams(Map<String, Object> params) {
        if (inputParams.size() == 0) return false;

        boolean isChanged = false;
        if (params != null && params.size() > 0) {
            synchronized (inputParams) {
                Set<Map.Entry<String, Object>> entries = params.entrySet();
                for (Map.Entry<String, Object> entry : entries) {
                    if (inputParams.containsKey(entry.getKey())) {
                        inputParams.put(entry.getKey(), entry.getValue());
                        isChanged = true;
                    }
                }
            }
        }
        return isChanged;
    }
}
